home *** CD-ROM | disk | FTP | other *** search
/ MacHack 2000 / MacHack 2000.toast / pc / The Hacks / MacHacksBug / Python 1.5.2c1 / Demo / threads / find.py < prev    next >
Encoding:
Python Source  |  2000-06-23  |  3.5 KB  |  155 lines

  1. # A parallelized "find(1)" using the thread module.
  2.  
  3. # This demonstrates the use of a work queue and worker threads.
  4. # It really does do more stats/sec when using multiple threads,
  5. # although the improvement is only about 20-30 percent.
  6.  
  7. # I'm too lazy to write a command line parser for the full find(1)
  8. # command line syntax, so the predicate it searches for is wired-in,
  9. # see function selector() below.  (It currently searches for files with
  10. # group or world write permission.)
  11.  
  12. # Usage: parfind.py [-w nworkers] [directory] ...
  13. # Default nworkers is 4, maximum appears to be 8 (on Irix 4.0.2)
  14.  
  15.  
  16. import sys
  17. import getopt
  18. import string
  19. import time
  20. import os
  21. from stat import *
  22. import thread
  23.  
  24.  
  25. # Work queue class.  Usage:
  26. #   wq = WorkQ()
  27. #   wq.addwork(func, (arg1, arg2, ...)) # one or more calls
  28. #   wq.run(nworkers)
  29. # The work is done when wq.run() completes.
  30. # The function calls executed by the workers may add more work.
  31. # Don't use keyboard interrupts!
  32.  
  33. class WorkQ:
  34.  
  35.     # Invariants:
  36.  
  37.     # - busy and work are only modified when mutex is locked
  38.     # - len(work) is the number of jobs ready to be taken
  39.     # - busy is the number of jobs being done
  40.     # - todo is locked iff there is no work and somebody is busy
  41.  
  42.     def __init__(self):
  43.         self.mutex = thread.allocate()
  44.         self.todo = thread.allocate()
  45.         self.todo.acquire()
  46.         self.work = []
  47.         self.busy = 0
  48.  
  49.     def addwork(self, func, args):
  50.         job = (func, args)
  51.         self.mutex.acquire()
  52.         self.work.append(job)
  53.         self.mutex.release()
  54.         if len(self.work) == 1:
  55.             self.todo.release()
  56.  
  57.     def _getwork(self):
  58.         self.todo.acquire()
  59.         self.mutex.acquire()
  60.         if self.busy == 0 and len(self.work) == 0:
  61.             self.mutex.release()
  62.             self.todo.release()
  63.             return None
  64.         job = self.work[0]
  65.         del self.work[0]
  66.         self.busy = self.busy + 1
  67.         self.mutex.release()
  68.         if len(self.work) > 0:
  69.             self.todo.release()
  70.         return job
  71.  
  72.     def _donework(self):
  73.         self.mutex.acquire()
  74.         self.busy = self.busy - 1
  75.         if self.busy == 0 and len(self.work) == 0:
  76.             self.todo.release()
  77.         self.mutex.release()
  78.  
  79.     def _worker(self):
  80.         time.sleep(0.00001)    # Let other threads run
  81.         while 1:
  82.             job = self._getwork()
  83.             if not job:
  84.                 break
  85.             func, args = job
  86.             apply(func, args)
  87.             self._donework()
  88.  
  89.     def run(self, nworkers):
  90.         if not self.work:
  91.             return # Nothing to do
  92.         for i in range(nworkers-1):
  93.             thread.start_new(self._worker, ())
  94.         self._worker()
  95.         self.todo.acquire()
  96.  
  97.  
  98. # Main program
  99.  
  100. def main():
  101.     sys.argv.append("/tmp")
  102.     nworkers = 4
  103.     opts, args = getopt.getopt(sys.argv[1:], '-w:')
  104.     for opt, arg in opts:
  105.         if opt == '-w':
  106.             nworkers = string.atoi(arg)
  107.     if not args:
  108.         args = [os.curdir]
  109.  
  110.     wq = WorkQ()
  111.     for dir in args:
  112.         wq.addwork(find, (dir, selector, wq))
  113.  
  114.     t1 = time.time()
  115.     wq.run(nworkers)
  116.     t2 = time.time()
  117.  
  118.     sys.stderr.write('Total time ' + `t2-t1` + ' sec.\n')
  119.  
  120.  
  121. # The predicate -- defines what files we look for.
  122. # Feel free to change this to suit your purpose
  123.  
  124. def selector(dir, name, fullname, stat):
  125.     # Look for group or world writable files
  126.     return (stat[ST_MODE] & 0022) != 0
  127.  
  128.  
  129. # The find procedure -- calls wq.addwork() for subdirectories
  130.  
  131. def find(dir, pred, wq):
  132.     try:
  133.         names = os.listdir(dir)
  134.     except os.error, msg:
  135.         print `dir`, ':', msg
  136.         return
  137.     for name in names:
  138.         if name not in (os.curdir, os.pardir):
  139.             fullname = os.path.join(dir, name)
  140.             try:
  141.                 stat = os.lstat(fullname)
  142.             except os.error, msg:
  143.                 print `fullname`, ':', msg
  144.                 continue
  145.             if pred(dir, name, fullname, stat):
  146.                 print fullname
  147.             if S_ISDIR(stat[ST_MODE]):
  148.                 if not os.path.ismount(fullname):
  149.                     wq.addwork(find, (fullname, pred, wq))
  150.  
  151.  
  152. # Call the main program
  153.  
  154. main()
  155.